package rocketmq;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * mq生产者
 * Created by Administrator on 2018/7/2.
 */
public class Producer {
    public static void main(String[] args) {
        // 创建一个Produer Group
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_name2");
        // 指定NameServer地址,多个节点之间使用分号间隔
        producer.setNamesrvAddr("192.168.1.103:9876");
        try {
            // 启动producer
            producer.start();
            for (int i = 0;i < 100; i++){
                // 创建一个Message ,并指定topic、Tag和消息主体
                Message msg = new Message("topic_ums2", "tag_register", (System.currentTimeMillis()+" 这是第 "+i+" 条消息bbbbb ").getBytes(RemotingHelper.DEFAULT_CHARSET));
                //向broker发送消息
                //1.可靠的同步传输
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        System.out.println(list.size());
                        return list.get(0);
                    }
                }, "123");//123会被传到第二个参数MessageQueueSelector中的方法形参中(也就是Object o),通常我们将自身业务相关的属性传入,用来选择对应的队列
                System.out.println("消息结果: "+sendResult);
                //2.可靠的异步传输, 通过传入匿名的回调类
                /*producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {//成功回调
                        System.out.println("消息发送成功:" + sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {//异常回调
                        System.out.println("消息发送异常:");
                        throwable.printStackTrace();
                    }
                });*/
                //单向传输, 中等可靠性,例如日志收集
                //producer.sendOneway(msg);
            }

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //当produer不再使用时,关闭produer
        producer.shutdown();
    }

}
